#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for the pipeline options validator module."""

# pytype: skip-file

import logging
import unittest

import pytest
from hamcrest import assert_that
from hamcrest import contains_string
from hamcrest import only_contains
from hamcrest.core.base_matcher import BaseMatcher
from parameterized import parameterized

from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator


# Mock runners to use for validations.
class MockRunners(object):
  class DataflowRunner(object):
    def get_default_gcp_region(self):
      # Return a default so we don't have to specify --region in every test
      # (unless specifically testing it).
      return 'us-central1'

  class TestDataflowRunner(DataflowRunner):
    pass

  class OtherRunner(object):
    pass


# Matcher that always passes for testing on_success_matcher option
class AlwaysPassMatcher(BaseMatcher):
  def _matches(self, item):
    return True


class SetupTest(unittest.TestCase):
  def check_errors_for_arguments(self, errors, args):
    """Checks that there is exactly one error for each given argument."""
    missing = []
    remaining = list(errors)

    for arg in args:
      found = False
      for error in remaining:
        if arg in error:
          remaining.remove(error)
          found = True
          break
      if not found:
        missing.append('Missing error for: %s.' % arg)

    # Return missing and remaining (not matched) errors.
    return missing + remaining

  def test_local_runner(self):
    runner = MockRunners.OtherRunner()
    options = PipelineOptions([])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)

  def test_missing_required_options(self):
    options = PipelineOptions([''])
    runner = MockRunners.DataflowRunner()
    # Remove default region for this test.
    runner.get_default_gcp_region = lambda: None
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()

    self.assertEqual(
        self.check_errors_for_arguments(
            errors, ['project', 'staging_location', 'temp_location', 'region']),
        [])

  @parameterized.expand([
      (None, 'gs://foo/bar',
       []), (None, None, ['staging_location', 'temp_location']),
      ('gs://foo/bar', None, []), ('gs://foo/bar', 'gs://ABC/bar',
                                   []), ('gcs:/foo/bar', 'gs://foo/bar', []),
      ('gs:/foo/bar', 'gs://foo/bar', []), ('gs://ABC/bar', 'gs://foo/bar', []),
      ('gs://ABC/bar', 'gs://BCD/bar', ['temp_location', 'staging_location'
                                        ]), ('gs://foo', 'gs://foo/bar', []),
      ('gs://foo/', 'gs://foo/bar', []), ('gs://foo/bar', 'gs://foo/bar', [])
  ])
  @unittest.skip('Not compatible with new GCS client. See GH issue #26335.')
  def test_gcs_path(self, temp_location, staging_location, expected_error_args):
    def get_validator(_temp_location, _staging_location):
      options = ['--project=example:example', '--job_name=job']

      if _temp_location is not None:
        options.append('--temp_location=' + _temp_location)

      if _staging_location is not None:
        options.append('--staging_location=' + _staging_location)

      pipeline_options = PipelineOptions(options)
      runner = MockRunners.DataflowRunner()
      validator = PipelineOptionsValidator(pipeline_options, runner)
      return validator

    errors = get_validator(temp_location, staging_location).validate()
    self.assertEqual(
        self.check_errors_for_arguments(errors, expected_error_args), [])

  @parameterized.expand([(None, ['project']), ('12345', ['project']),
                         ('FOO', ['project']), ('foo:BAR', ['project']),
                         ('fo', ['project']), ('foo', []), ('foo:bar', [])])
  def test_project(self, project, expected_error_args):
    def get_validator(_project):
      options = [
          '--job_name=job',
          '--staging_location=gs://foo/bar',
          '--temp_location=gs://foo/bar'
      ]

      if _project is not None:
        options.append('--project=' + _project)

      pipeline_options = PipelineOptions(options)
      runner = MockRunners.DataflowRunner()
      validator = PipelineOptionsValidator(pipeline_options, runner)
      return validator

    errors = get_validator(project).validate()
    self.assertEqual(
        self.check_errors_for_arguments(errors, expected_error_args), [])

  @parameterized.expand([(None, []), ('12345', ['job_name']),
                         ('FOO', ['job_name']), ('foo:bar', ['job_name']),
                         ('fo', []), ('foo', [])])
  def test_job_name(self, job_name, expected_error_args):
    def get_validator(_job_name):
      options = [
          '--project=example:example',
          '--staging_location=gs://foo/bar',
          '--temp_location=gs://foo/bar'
      ]

      if _job_name is not None:
        options.append('--job_name=' + _job_name)

      pipeline_options = PipelineOptions(options)
      runner = MockRunners.DataflowRunner()
      validator = PipelineOptionsValidator(pipeline_options, runner)
      return validator

    errors = get_validator(job_name).validate()
    self.assertEqual(
        self.check_errors_for_arguments(errors, expected_error_args), [])

  @parameterized.expand([(None, []), ('1', []), ('0', ['num_workers']),
                         ('-1', ['num_workers'])])
  def test_num_workers(self, num_workers, expected_error_args):
    def get_validator(_num_workers):
      options = [
          '--project=example:example',
          '--job_name=job',
          '--staging_location=gs://foo/bar',
          '--temp_location=gs://foo/bar'
      ]

      if _num_workers is not None:
        options.append('--num_workers=' + _num_workers)

      pipeline_options = PipelineOptions(options)
      runner = MockRunners.DataflowRunner()
      validator = PipelineOptionsValidator(pipeline_options, runner)
      return validator

    errors = get_validator(num_workers).validate()
    self.assertEqual(
        self.check_errors_for_arguments(errors, expected_error_args), [])

  @parameterized.expand([
      (
          MockRunners.OtherRunner(),
          [],
          False,
      ),
      (
          MockRunners.OtherRunner(),
          ['--dataflow_endpoint=https://dataflow.googleapis.com'],
          False,
      ),
      (
          MockRunners.OtherRunner(),
          ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
          False,
      ), (
          MockRunners.DataflowRunner(),
          [],
          True,
      ),
      (
          MockRunners.DataflowRunner(),
          ['--dataflow_endpoint=https://another.service.com'],
          True,
      ),
      (
          MockRunners.DataflowRunner(),
          ['--dataflow_endpoint=https://dataflow.googleapis.com'],
          True,
      ),
      (
          MockRunners.DataflowRunner(),
          ['--dataflow_endpoint=http://localhost:1000'],
          False,
      ),
      (
          MockRunners.DataflowRunner(),
          ['--dataflow_endpoint=foo: //dataflow. googleapis. com'],
          True,
      ), (
          MockRunners.DataflowRunner(),
          [],
          True,
      )
  ])
  def test_is_service_runner(self, runner, options, expected):
    validator = PipelineOptionsValidator(PipelineOptions(options), runner)
    self.assertEqual(validator.is_service_runner(), expected)

  def test_pickle_library_dill_not_installed_returns_error(self):
    runner = MockRunners.OtherRunner()
    # Remove default region for this test.
    options = PipelineOptions(['--pickle_library=dill'])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1, errors)
    self.assertIn("Option pickle_library=dill requires dill", errors[0])

  @pytest.mark.uses_dill
  def test_pickle_library_dill_installed_returns_no_error(self):
    pytest.importorskip("dill")
    runner = MockRunners.OtherRunner()
    # Remove default region for this test.
    options = PipelineOptions(['--pickle_library=dill'])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0, errors)

  @pytest.mark.uses_dill
  def test_pickle_library_dill_installed_returns_wrong_version(self):
    pytest.importorskip("dill")
    with unittest.mock.patch('dill.__version__', '0.3.6'):
      runner = MockRunners.OtherRunner()
      # Remove default region for this test.
      options = PipelineOptions(['--pickle_library=dill'])
      validator = PipelineOptionsValidator(options, runner)
      errors = validator.validate()
      self.assertEqual(len(errors), 1, errors)
      self.assertIn("Dill version 0.3.1.1 is required when using ", errors[0])

  @pytest.mark.uses_dill
  def test_pickle_library_dill_unsafe_no_error(self):
    pytest.importorskip("dill")
    with unittest.mock.patch('dill.__version__', '0.3.6'):
      runner = MockRunners.OtherRunner()
      # Remove default region for this test.
      options = PipelineOptions(['--pickle_library=dill_unsafe'])
      validator = PipelineOptionsValidator(options, runner)
      errors = validator.validate()
      self.assertEqual(len(errors), 0, errors)

  def test_dataflow_job_file_and_template_location_mutually_exclusive(self):
    runner = MockRunners.OtherRunner()
    options = PipelineOptions(
        ['--template_location', 'abc', '--dataflow_job_file', 'def'])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertTrue(errors)

  def test_validate_template_location(self):
    runner = MockRunners.OtherRunner()
    options = PipelineOptions([
        '--template_location',
        'abc',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertFalse(errors)

  def test_validate_dataflow_job_file(self):
    runner = MockRunners.OtherRunner()
    options = PipelineOptions(['--dataflow_job_file', 'abc'])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertFalse(errors)

  def test_num_workers_is_positive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--num_workers=-1',
        '--worker_region=us-east1',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('num_workers', errors[0])
    self.assertIn('-1', errors[0])

  def test_max_num_workers_is_positive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--max_num_workers=-1',
        '--worker_region=us-east1',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('max_num_workers', errors[0])
    self.assertIn('-1', errors[0])

  def test_num_workers_cannot_exceed_max_num_workers(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--num_workers=43',
        '--max_num_workers=42',
        '--worker_region=us-east1',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('num_workers', errors[0])
    self.assertIn('43', errors[0])
    self.assertIn('max_num_workers', errors[0])
    self.assertIn('42', errors[0])

  def test_num_workers_can_equal_max_num_workers(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--num_workers=42',
        '--max_num_workers=42',
        '--worker_region=us-east1',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)

  def test_zone_and_worker_region_mutually_exclusive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--zone',
        'us-east1-b',
        '--worker_region',
        'us-east1',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('zone', errors[0])
    self.assertIn('worker_region', errors[0])

  def test_zone_and_worker_zone_mutually_exclusive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--zone',
        'us-east1-b',
        '--worker_zone',
        'us-east1-c',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('zone', errors[0])
    self.assertIn('worker_zone', errors[0])

  def test_experiment_region_and_worker_region_mutually_exclusive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--experiments',
        'worker_region=us-west1',
        '--worker_region',
        'us-east1',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('experiment', errors[0])
    self.assertIn('worker_region', errors[0])

  def test_region_and_worker_zone_mutually_exclusive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--worker_region=us-west1',
        '--worker_zone=us-east1-b',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('worker_region', errors[0])
    self.assertIn('worker_zone', errors[0])

  def test_programmatically_set_experiment_passed_as_string(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions(
        project='example.com:example',
        temp_location='gs://foo/bar/',
        experiments='enable_prime',
        dataflow_service_options='use_runner_v2',
    )
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 2)
    self.assertIn('experiments', errors[0])
    self.assertIn('dataflow_service_options', errors[1])

  def test_programmatically_set_experiment_passed_as_list(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions(
        project='example.com:example',
        temp_location='gs://foo/bar/',
        experiments=['enable_prime'],
        dataflow_service_options=['use_runner_v2'],
    )
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)
    self.assertEqual(
        options.view_as(DebugOptions).experiments, ['enable_prime'])
    self.assertEqual(
        options.view_as(GoogleCloudOptions).dataflow_service_options,
        ['use_runner_v2'])

  def test_worker_region_and_worker_zone_mutually_exclusive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--worker_region',
        'us-east1',
        '--worker_zone',
        'us-east1-b',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('worker_region', errors[0])
    self.assertIn('worker_zone', errors[0])

  def test_zone_alias_worker_zone(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--zone=us-east1-b',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)
    self.assertIsNone(options.view_as(WorkerOptions).zone)
    self.assertEqual(options.view_as(WorkerOptions).worker_zone, 'us-east1-b')

  def test_region_optional_for_non_service_runner(self):
    runner = MockRunners.OtherRunner()
    # Remove default region for this test.
    runner.get_default_gcp_region = lambda: None
    options = PipelineOptions([
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)

  def test_dataflow_endpoint_is_a_url(self):
    runner = MockRunners.DataflowRunner()
    # Remove default region for this test.
    options = PipelineOptions([
        '--project=example:example',
        '--temp_location=gs://foo/bar',
        '--staging_location=gs://foo/baz',
        '--dataflow_endpoint=foo and bar'
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1, errors)
    self.assertIn("Invalid url (foo and bar)", errors[0])

  def test_alias_sdk_container_to_worker_harness(self):
    runner = MockRunners.DataflowRunner()
    test_image = "SDK_IMAGE"
    options = PipelineOptions([
        '--sdk_container_image=%s' % test_image,
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)
    self.assertEqual(
        options.view_as(WorkerOptions).worker_harness_container_image,
        test_image)
    self.assertEqual(
        options.view_as(WorkerOptions).sdk_container_image, test_image)

  def test_alias_worker_harness_sdk_container_image(self):
    runner = MockRunners.DataflowRunner()
    test_image = "WORKER_HARNESS"
    options = PipelineOptions([
        '--worker_harness_container_image=%s' % test_image,
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)
    self.assertEqual(
        options.view_as(WorkerOptions).worker_harness_container_image,
        test_image)
    self.assertEqual(
        options.view_as(WorkerOptions).sdk_container_image, test_image)

  def test_worker_harness_sdk_container_image_mutually_exclusive(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--worker_harness_container_image=WORKER',
        '--sdk_container_image=SDK_ONLY',
        '--project=example:example',
        '--temp_location=gs://foo/bar',
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('sdk_container_image', errors[0])
    self.assertIn('worker_harness_container_image', errors[0])

  def test_prebuild_sdk_container_base_image_disallowed(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--project=example:example',
        '--temp_location=gs://foo/bar',
        '--prebuild_sdk_container_base_image=gcr.io/foo:bar'
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 1)
    self.assertIn('prebuild_sdk_container_base_image', errors[0])
    self.assertIn('sdk_container_image', errors[0])

  def test_prebuild_sdk_container_base_allowed_if_matches_custom_image(self):
    runner = MockRunners.DataflowRunner()
    options = PipelineOptions([
        '--project=example:example',
        '--temp_location=gs://foo/bar',
        '--sdk_container_image=gcr.io/foo:bar',
        '--prebuild_sdk_container_base_image=gcr.io/foo:bar'
    ])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertEqual(len(errors), 0)

  @parameterized.expand([(None, []), (pickler.dumps(AlwaysPassMatcher()), []),
                         (b'abc', ['on_success_matcher']),
                         (pickler.dumps(object), ['on_success_matcher'])])
  def test_test_matcher(self, on_success_matcher, errors):
    def get_validator(matcher):
      options = [
          '--project=example:example',
          '--job_name=job',
          '--staging_location=gs://foo/bar',
          '--temp_location=gs://foo/bar',
      ]
      if matcher:
        options.append('%s=%s' % ('--on_success_matcher', matcher.decode()))

      pipeline_options = PipelineOptions(options)
      runner = MockRunners.TestDataflowRunner()
      return PipelineOptionsValidator(pipeline_options, runner)

    errors = get_validator(on_success_matcher).validate()
    self.assertEqual(self.check_errors_for_arguments(errors, errors), [])

  def test_transform_name_mapping_without_update(self):
    options = [
        '--project=example:example',
        '--staging_location=gs://foo/bar',
        '--temp_location=gs://foo/bar',
        '--transform_name_mapping={\"fromPardo\":\"toPardo\"}'
    ]

    pipeline_options = PipelineOptions(options)
    runner = MockRunners.DataflowRunner()
    validator = PipelineOptionsValidator(pipeline_options, runner)
    errors = validator.validate()
    assert_that(
        errors,
        only_contains(
            contains_string(
                'Transform name mapping option is only useful when '
                '--update and --streaming is specified')))

  def test_transform_name_mapping_invalid_format(self):
    options = [
        '--project=example:example',
        '--staging_location=gs://foo/bar',
        '--temp_location=gs://foo/bar',
        '--update',
        '--job_name=test',
        '--streaming',
        '--transform_name_mapping={\"fromPardo\":123}'
    ]

    pipeline_options = PipelineOptions(options)
    runner = MockRunners.DataflowRunner()
    validator = PipelineOptionsValidator(pipeline_options, runner)
    errors = validator.validate()
    assert_that(
        errors,
        only_contains(
            contains_string('Invalid transform name mapping format.')))

  def test_type_check_additional(self):
    runner = MockRunners.OtherRunner()
    options = PipelineOptions(['--type_check_additional=all'])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertFalse(errors)

    options = PipelineOptions(['--type_check_additional='])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertFalse(errors)

  def test_type_check_additional_unrecognized_feature(self):
    runner = MockRunners.OtherRunner()
    options = PipelineOptions(['--type_check_additional=all,dfgdf'])
    validator = PipelineOptionsValidator(options, runner)
    errors = validator.validate()
    self.assertTrue(errors)

  @parameterized.expand([
      (['--environment_type=dOcKeR'], []),
      ([
          '--environment_type=dOcKeR',
          '--environment_options=docker_container_image=foo'
      ], []), (['--environment_type=dOcKeR', '--environment_config=foo'], []),
      ([
          '--environment_type=dOcKeR',
          '--environment_options=docker_container_image=foo',
          '--environment_config=foo'
      ], ['environment_config']),
      ([
          '--environment_type=dOcKeR',
          '--environment_options=process_command=foo',
          '--environment_options=process_variables=foo=bar',
          '--environment_options=external_service_address=foo'
      ], ['process_command', 'process_variables', 'external_service_address']),
      (['--environment_type=pRoCeSs'], ['process_command']),
      ([
          '--environment_type=pRoCeSs',
          '--environment_options=process_command=foo'
      ], []), (['--environment_type=pRoCeSs', '--environment_config=foo'], []),
      ([
          '--environment_type=pRoCeSs',
          '--environment_options=process_command=foo',
          '--environment_config=foo'
      ], ['environment_config']),
      ([
          '--environment_type=pRoCeSs',
          '--environment_options=process_command=foo',
          '--environment_options=process_variables=foo=bar',
          '--environment_options=docker_container_image=foo',
          '--environment_options=external_service_address=foo'
      ], ['docker_container_image', 'external_service_address']),
      (['--environment_type=eXtErNaL'], ['external_service_address']),
      ([
          '--environment_type=eXtErNaL',
          '--environment_options=external_service_address=foo'
      ], []), (['--environment_type=eXtErNaL', '--environment_config=foo'], []),
      ([
          '--environment_type=eXtErNaL',
          '--environment_options=external_service_address=foo',
          '--environment_config=foo'
      ], ['environment_config']),
      ([
          '--environment_type=eXtErNaL',
          '--environment_options=external_service_address=foo',
          '--environment_options=process_command=foo',
          '--environment_options=process_variables=foo=bar',
          '--environment_options=docker_container_image=foo'
      ], ['process_command', 'process_variables',
          'docker_container_image']), (['--environment_type=lOoPbACk'], []),
      (['--environment_type=lOoPbACk',
        '--environment_config=foo'], ['environment_config']),
      ([
          '--environment_type=lOoPbACk',
          '--environment_options=docker_container_image=foo',
          '--environment_options=process_command=foo',
          '--environment_options=process_variables=foo=bar',
          '--environment_options=external_service_address=foo'
      ],
       [
           'docker_container_image',
           'process_command',
           'process_variables',
           'external_service_address'
       ]), (['--environment_type=beam:env:foo:v1'], []),
      (['--environment_type=beam:env:foo:v1', '--environment_config=foo'], []),
      ([
          '--environment_type=beam:env:foo:v1',
          '--environment_options=docker_container_image=foo',
          '--environment_options=process_command=foo',
          '--environment_options=process_variables=foo=bar',
          '--environment_options=external_service_address=foo'
      ],
       [
           'docker_container_image',
           'process_command',
           'process_variables',
           'external_service_address'
       ]),
      ([
          '--environment_options=docker_container_image=foo',
          '--environment_options=process_command=foo',
          '--environment_options=process_variables=foo=bar',
          '--environment_options=external_service_address=foo'
      ],
       [
           'docker_container_image',
           'process_command',
           'process_variables',
           'external_service_address'
       ])
  ])
  def test_environment_options(self, options, expected_error_args):
    errors = []
    validator = PipelineOptionsValidator(
        PipelineOptions(options), MockRunners.OtherRunner())
    validation_result = validator.validate()
    validation_errors = self.check_errors_for_arguments(
        validation_result, expected_error_args)
    if validation_errors:
      errors.append(
          'Options "%s" had unexpected validation results: "%s"' %
          (' '.join(options), ' '.join(validation_errors)))
    self.assertEqual(errors, [], expected_error_args)


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  unittest.main()
